package com.jianying.day01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * ClassName: Flink03_WC_Stream_UnBound
 * Package: com.atguigu.day01
 * Description:
 *
 * @Author: tubage
 * @Create: 2024/3/29 14:13
 * @Version: 1.0
 */
public class Flink03_WC_Stream_UnBound {
    public static void main(String[] args) throws Exception {
        // TODO 1 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO 2 网络端口读取数据
        DataStreamSource<String> socketDS = env.socketTextStream("EC2AMAZ-G85388I", 8888);
        // TODO 3 扁平化处理 并格式转换
        SingleOutputStreamOperator<Tuple2<String, Long>> flatMapDS = socketDS.flatMap(
                new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
                        String[] words = s.split(" ");
                        for (String word : words) {
                            collector.collect(Tuple2.of(word, 1L));
                        }
                    }
                }
        );
        // TODO 4 分组
        KeyedStream<Tuple2<String, Long>, Tuple> keyByDS = flatMapDS.keyBy(0);
        // TODO 5 计数
        SingleOutputStreamOperator<Tuple2<String, Long>> sumDS = keyByDS.sum(1);
        // TODO 6 打印
        sumDS.print();
        // TODO 7 提交
        env.execute();
    }
}
